Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

this pr add sync modes allowing us to move to subscriptions when at the head of the chain

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @tac0turtle's task in 2m 33s —— View job


Code Review Complete

I've performed a comprehensive code review of the sync mode implementation. This is a well-architected feature that adds intelligent dual-mode synchronization. Below is my detailed analysis.


Overview Assessment

This PR introduces a sophisticated sync mode system that allows nodes to dynamically switch between:

  • Catchup mode: Aggressive polling when behind the DA chain head
  • Follow mode: Subscription-based real-time updates when caught up

The implementation is generally solid with good architectural decisions. However, I've identified several critical issues and areas for improvement.


Critical Issues

1. Potential Data Race in processSubscriptionResponse (syncer.go:566-579)

The loop iterates over events and takes the address of the loop variable event when caching:

for _, event := range events {
    select {
    case s.heightInCh <- event:
        // ...
    default:
        s.cache.SetPendingEvent(event.Header.Height(), &event)  // ⚠️ Taking address of loop variable

Issue: While Go 1.22+ fixed the loop variable capture issue for the variable itself, taking the address of a loop variable still requires care. Each iteration reuses the same memory location for event, so all cached pointers will point to the final iteration's value.

Impact: HIGH - Multiple cached events will incorrectly reference the same (last) event, causing data loss and sync failures.

Fix: Add explicit capture:

for _, event := range events {
    event := event  // Explicit capture
    select {
    case s.heightInCh <- event:
        // ...
    default:
        s.cache.SetPendingEvent(event.Header.Height(), &event)

Fix this →

Location: block/internal/syncing/syncer.go:566-579


2. Missing Timeout on Subscription Context (syncer.go:452-543)

The subscription in subscribeAndFollow() creates a child context but has no overall timeout, relying only on the watchdog timer:

subCtx, cancel := context.WithCancel(s.ctx)
defer cancel()

headerCh, err := s.daClient.Subscribe(subCtx, headerNS)

Issue: If the subscription hangs during creation or the watchdog never fires (e.g., one namespace keeps sending events), the goroutine could be stuck indefinitely.

Impact: MEDIUM - Potential goroutine leak and inability to switch back to catchup mode.

Recommendation: Consider adding a maximum subscription duration or ensure the watchdog is robust enough to detect all stuck scenarios.


3. Race Condition in Mode Determination (syncer.go:363-374)

Mode switches are tracked but there's a potential race between determineSyncMode() calls and the actual mode switch:

mode := s.determineSyncMode()
previousMode := SyncMode(s.currentSyncMode.Load())

if mode != previousMode {
    s.currentSyncMode.Store(int32(mode))
    s.metrics.ModeSwitches.Add(1)

Issue: Between determining the mode and storing it, another goroutine could change the mode, leading to incorrect metrics and logging.

Impact: LOW - Metrics and logs may be slightly inaccurate, but won't affect correctness of sync operation.

Recommendation: Consider compare-and-swap for mode updates or accept the minor race (acceptable for metrics).


High-Priority Issues

4. Insufficient Error Context in subscribeAndFollow (syncer.go:440-447)

Subscription failures increment a metric but don't provide detailed context for debugging:

err := s.subscribeAndFollow()
if err != nil && !errors.Is(err, context.Canceled) {
    s.metrics.SubscribeErrors.Add(1)
    s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")

Issue: No context about which namespace failed, what the network state was, or how many times this has failed in a row.

Recommendation: Add structured logging fields (namespace, DA height, consecutive failures) to aid debugging production issues.


5. LocalHead Error Handling Could Be More Robust (syncer.go:386-406)

When LocalHead() fails, the code defaults to catchup mode silently:

localHead, err := s.daClient.LocalHead(s.ctx)
if err != nil {
    s.logger.Debug().Err(err).Msg("failed to get local DA head, defaulting to catchup mode")
    return SyncModeCatchup
}

Issue: This masks potentially serious DA client issues. If LocalHead consistently fails, the node will never enter follow mode.

Recommendation:

  • Track consecutive LocalHead failures
  • Emit a metric for LocalHead errors
  • After N consecutive failures, log at WARN/ERROR level
  • Consider adding a health check endpoint

6. ProcessBlobs Method Unexported but Called via Type Assertion (syncer.go:563)

events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)

The daRetriever field is typed as DARetriever interface, and ProcessBlobs is part of that interface (correctly). However, the comment from Gemini bot about a type assertion issue may have been referring to a different version.

Current Status: ✅ This is actually correct - ProcessBlobs is part of the DARetriever interface (da_retriever.go:24-26).


Medium-Priority Issues

7. Test Coverage is Low (Codecov Report)

Codecov shows 36.25% patch coverage with 109 lines missing:

  • syncer.go: 44.44% coverage (74 missing, 1 partial)
  • tools/local-da/rpc.go: 0% coverage (22 missing)
  • block/internal/da/client.go: 0% coverage (12 missing)

Issue: Critical paths like subscription handling, mode switching, and error scenarios lack test coverage.

Recommendation: Add tests for:

  • Mode switch logic with various DA height scenarios
  • Subscription failure and recovery
  • Watchdog timeout scenarios
  • Loop variable capture in processSubscriptionResponse
  • LocalHead errors and fallback behavior

8. Watchdog Timeout Calculation (syncer.go:493-496)

watchdogTimeout := s.config.DA.BlockTime.Duration * followWatchdogMultiplier
if watchdogTimeout <= 0 {
    watchdogTimeout = 30 * time.Second
}

Issue: If BlockTime is very small (e.g., 100ms for fast test chains), watchdogTimeout could be only 300ms, causing false positives.

Recommendation: Enforce a minimum watchdog timeout (e.g., 5 seconds) regardless of block time.


9. Missing Cancellation on Subscription Errors (syncer.go:462-474)

If one subscription succeeds but another fails, the successful subscription's context isn't explicitly cancelled in the error path:

headerCh, err := s.daClient.Subscribe(subCtx, headerNS)
if err != nil {
    return fmt.Errorf("failed to subscribe to header namespace: %w", err)
}

// If this fails, headerCh's subscription is still active via subCtx
dataCh, err := s.daClient.Subscribe(subCtx, dataNS)
if err != nil {
    return fmt.Errorf("failed to subscribe to data namespace: %w", err)
}

Current Status: Actually OK - the defer cancel() on line 459 will clean up all subscriptions on any error. But it's not immediately obvious.

Recommendation: Add a comment explaining that defer handles cleanup.


10. LocalDA RPC Missing Implementation (tools/local-da/rpc.go:133-137)

The LocalDA test harness returns a closed channel for Subscribe:

func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) {
    ch := make(chan *jsonrpc.SubscriptionResponse)
    close(ch)
    return ch, nil
}

Issue: This will cause follow mode tests to immediately fail with "subscription closed" error.

Recommendation: Implement a basic subscription mechanism for testing, even if it's just a timer-based mock.


Code Quality Issues

11. Magic Numbers

  • catchupThreshold = 2 (line 57): Why 2 blocks? Should this be configurable?
  • followWatchdogMultiplier = 3 (line 61): Why 3x block time?
  • Hardcoded 30 * time.Second fallback (line 495)

Recommendation: Add comments explaining the rationale, or make these configurable.


12. Redundant Nil Check (syncer.go:513)

case resp, ok := <-dataCh:
    // Note: if dataCh is nil (same namespace as header), this case never fires
    if !ok {
        return errors.New("data subscription closed")

The comment correctly notes that selecting on a nil channel blocks forever, making the nil check conceptually redundant. However, the check is for channel closure, not nil, so this is actually correct.

Status: ✅ Code is correct as-is.


Security Considerations

13. No Rate Limiting on Subscription Events

The subscription processing loop has no rate limiting:

case resp, ok := <-headerCh:
    if !ok {
        return errors.New("header subscription closed")
    }
    if err := s.processSubscriptionResponse(resp); err != nil {
        s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process header subscription")
    }

Issue: A malicious or buggy DA node could flood subscriptions with events, overwhelming the syncer.

Recommendation:

  • Add rate limiting to subscription event processing
  • Track subscription event rate and emit metrics
  • Consider rejecting suspiciously high event rates

14. Subscription Context Lifecycle

The subscription context is cancelled when the function returns, but there's no explicit tracking of subscription goroutines:

subCtx, cancel := context.WithCancel(s.ctx)
defer cancel()

Status: ✅ This is correct - cancelling the context will cause the subscription channels to close.


Performance Considerations

15. Subscription Response Processing

Each subscription event processes blobs synchronously:

if err := s.processSubscriptionResponse(resp); err != nil {
    s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process header subscription")
}

Issue: Large blob batches could block the subscription loop, preventing other subscriptions from being processed.

Recommendation: Consider processing blobs in a worker pool or at least with a timeout.


16. Memory Usage in ProcessBlobs

The ProcessBlobs method accumulates events in a slice that grows unbounded within a single DA height:

var events []common.DAHeightEvent
// ...
events = append(events, event)

Issue: A single DA height with thousands of blocks could cause memory spikes.

Recommendation: Process and send events incrementally rather than accumulating all events before sending.


Architecture & Design

17. Excellent Separation of Concerns

The PR correctly separates:

  • Mode determination logic (determineSyncMode)
  • Polling logic (runCatchupMode)
  • Subscription logic (runFollowMode)
  • Blob processing (daRetriever.ProcessBlobs)

This makes the code testable and maintainable.


18. Good Interface Design

Adding Subscribe and LocalHead to the Client interface is the right approach:

type Client interface {
    // ...
    Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error)
    LocalHead(ctx context.Context) (uint64, error)
}

This maintains testability and allows for different DA implementations.


19. Metrics Coverage is Good

The new metrics provide good observability:

  • SyncMode: Current mode (catchup=0, follow=1)
  • SubscribeErrors: Subscription failure count
  • ModeSwitches: Number of mode transitions

Recommendation: Consider adding:

  • subscription_duration_seconds: Histogram of how long subscriptions stay active
  • catchup_iterations_total: Number of catchup loop iterations
  • da_height_lag: Current DA height minus syncer's DA height

Testing Recommendations

20. Missing Test Scenarios

The PR adds syncer_test.go changes but critical scenarios are untested:

  1. Mode switching: Test catchup → follow → catchup transitions
  2. Subscription failures: Test recovery from subscription errors
  3. Watchdog triggering: Test watchdog timeout scenarios
  4. Loop variable capture: Test that cached events have correct data
  5. Concurrent events: Test multiple events arriving simultaneously
  6. LocalHead failures: Test behavior when LocalHead consistently fails

Recommendation: Add integration tests covering these scenarios.


Documentation

21. Well-Documented Constants

The constants have clear documentation:

// catchupThreshold is the number of DA blocks behind local head
// before switching from follow to catchup mode.
catchupThreshold = 2

22. Missing Package-Level Documentation

The new sync mode behavior isn't documented at the package level.

Recommendation: Add a package comment or design doc explaining:

  • When and why modes switch
  • Performance characteristics of each mode
  • Expected behavior during network partitions
  • Fallback strategies

Summary & Recommendations

Must Fix Before Merge (Critical)

  1. Loop variable capture fix (syncer.go:566-579) - Add event := event
  2. Improve test coverage - Add tests for critical paths (target >70%)
  3. Implement LocalDA Subscribe - Enable follow mode testing

Should Fix Before Merge (High Priority)

  1. Add structured logging for subscription failures
  2. Track and alert on consecutive LocalHead failures
  3. Document sync mode behavior at package level
  4. Add minimum watchdog timeout enforcement

Consider for Follow-Up

  1. Add rate limiting to subscription processing
  2. Add incremental event processing to reduce memory spikes
  3. Make thresholds configurable (catchupThreshold, watchdogMultiplier)
  4. Add additional metrics (subscription_duration, da_height_lag)

Positive Highlights

  • Excellent architectural separation between catchup and follow modes
  • Well-designed interface extensions for Subscribe and LocalHead
  • Good use of atomic operations for thread-safe mode tracking
  • Comprehensive metrics for observability
  • Clean error handling with appropriate error wrapping
  • Watchdog mechanism prevents follow mode from getting stuck
  • Proper context handling throughout subscription lifecycle

Overall Assessment

This is a well-architected feature that significantly improves DA synchronization efficiency. The dual-mode approach is sound, and the implementation demonstrates good understanding of concurrency patterns.

Primary concerns:

  1. The loop variable capture bug is critical and must be fixed
  2. Test coverage needs improvement (currently 36.25%)
  3. LocalDA test harness needs Subscribe implementation

Once these issues are addressed, this will be a solid addition to the codebase.

Recommendation: Request changes for the critical loop variable issue and test coverage improvements.


@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 14, 2026, 8:35 PM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing.

Highlights

  • Dynamic Sync Modes: Introduced two dynamic synchronization modes for the DA worker: 'Catchup' for aggressive polling when behind the chain head, and 'Follow' for efficient, subscription-based real-time updates when caught up.
  • Subscription-based Updates: Implemented a new Subscribe method in the DA client, enabling the 'Follow' mode to receive real-time blob notifications, significantly reducing polling overhead.
  • Local Head Tracking: Added a LocalHead method to the DA client and a corresponding RPC endpoint, allowing the syncer to determine its current position relative to the DA chain head and facilitate mode switching.
  • New Metrics for Observability: Integrated new Prometheus metrics (SyncMode, SubscribeErrors, ModeSwitches) to provide visibility into the current sync mode, subscription failures, and transitions between modes.
  • Refactored DA Worker Loop: The daWorkerLoop has been refactored to intelligently determine and switch between 'Catchup' and 'Follow' modes based on the node's synchronization status, including a watchdog mechanism for 'Follow' mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 35.42857% with 113 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.32%. Comparing base (34034c1) to head (127440b).

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 44.44% 74 Missing and 1 partial ⚠️
tools/local-da/rpc.go 0.00% 22 Missing ⚠️
block/internal/da/client.go 0.00% 12 Missing ⚠️
block/internal/da/tracing.go 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2961      +/-   ##
==========================================
- Coverage   57.77%   57.32%   -0.45%     
==========================================
  Files          98       98              
  Lines        9394     9549     +155     
==========================================
+ Hits         5427     5474      +47     
- Misses       3364     3471     +107     
- Partials      603      604       +1     
Flag Coverage Δ
combined 57.32% <35.42%> (-0.45%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tac0turtle tac0turtle force-pushed the marko/sync_subscribe branch from 95aeea4 to ecfcf83 Compare January 12, 2026 08:22
@tac0turtle tac0turtle marked this pull request as ready for review January 12, 2026 08:37
@julienrbrt
Copy link
Member

CI is not so glad.

@tac0turtle
Copy link
Contributor Author

CI is not so glad.

fixed

}

// Subscribe to forced inclusion namespace if configured
var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to follow the force included namespace. The retriever itself does the caching itself. Maybe we should align this logic in the force inclusion retriever as well instead of using the async block fetching (in da)

s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
}

case resp, ok := <-forcedInclusionCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this is dead code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants